Dubbo Feature - 02. 并发控制

Overview

了解本部分请先参考: 6.28 并发控制 · GitBook

相关代码位于 dubbo-rpc-api

根据每个示例, 找到Dubbo对应实现代码的过程很简单. 由于Dubbo中几乎所有的配置都有一个Key, 所以直接在Constants(dubbo-common)中找到对应的Key调用即可. 比如配置executes对应的就是Constants.EXECUTES_KEY. 查找调用关系可以直接定位到ExecuteLimitFilter.

Filter加载过程

ExecuteLimitFilter

首先来看下Provider端ExecuteLimitFilter的加载过程. 在dubbo-rpc-api中, src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter有配置executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter.

Fillter的加载源于ProtocolFilterWrapper.buildInvokerChain, 在这里断点可以看到调用是从ServiceBean开始的:

1
2
3
4
ServiceBean.export
-> Protocol$Adaptive.export
-> ProtocolFilterWrapper.export
-> ProtocolFilterWrapper.buildInvokerChain

其中获取所有Filter的代码如下:

1
ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

根据之前对于Dubbo SPI的了解, getActivateExtension这一方法中, 扩展类需要满足ExtensionLoader.isMatchGroup, 以及isActive时才会返回, 所以在Provider端中, 且配置了executes属性时, 标记了@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)ExecuteLimitFilter会被放到整个Filter Chain中.

buildInvokerChain中传入的invokerJDKProxyFactory$1 (AbstractProxyInvoker). 在此方法中, 返回了一个Invoker, 观察invoke(invocation)方法的实现, 实际上是将invoker的invoke方法之外包裹了多层Filter, 而其中Filter中也会调用下一个Filter, 由此构成了一个责任链模式.

回想业务方法的栈

1
2
3
4
5
6
7
8
9
10
11
12
13
ChannelEventRunnable.run
-> DecodeHandler.received
-> HeaderExchangeHandler.recieved
-> HeaderExchangeHandler.handleRequest
-> DubboProtocol$1.reply // 匿名类ExchangeHandler
-> ProtocolFilterWrapper$1.invoke // 匿名类Invoker, Filter介入
-> EchoFilter.invoke
... 每个Filter和ProtocolFilterWrapper$1的反复
-> RegisterProtocol$InvokerDelegete.invoke
-> DelegateProviderMetaDataInvoker.invoke
-> JavassistProxyFactory$1.invoke
-> Wrapper1.invokeMethod
-> DemoService.sayHello

注: $1表示代码中的匿名类.

ActiveLimitFilter

Consumer端的ActiveLimitFilter加载过程类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ClassPathXmlApplicatonContext.getBean
-> ReferenceBean.crateProxy
-> Protocol$Adaptive.refer
-> QosProtocolWrapper.refer
-> ProtocolListenerWrapper.refer
-> ProtocolFilterWrapper.refer
-> RegistryProtocol.refer
-> RegistryDirectory.subscribe
-> ZookeeperRegistry.subscribe
-> ZookeeperRegistry.notify
-> RegistryDirectory.toInvokers
-> QosProtocolWrapper.refer
另起一行
-> ProtocolListenerWrapper.refer
-> ProtocolFilterWrapper.refer
-> ProtocolFilterWrapper.buildInvokerChain

buildInvokerChain的参数是一个DubboInvoker.

涉及这一系列Filter的调用栈如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$Proxy11.sayHello
-> InvokerInvocationHandler.invoke
-> MockClusterInvoker.invoke
-> FailoverClusterInvoker.invoke
-> RegisterDirectory$InvokerDelegate.invoke
-> ListenerInvokerWrapper.invoker
-> ProtocolFilterWrapper$1.invoke
-> ConsumerContextFilter.invoke
... ProtocolFilterWrapper$1.invoke与filter交替
-> DubboInvoker.invoke // DubboInvoker

另起一行, 开始数据交互
-> RefercnceCountExchangeClient.request
-> HeaderExchangeClient.request
-> HeaderExchangeChannel.request
-> NettyClient.send

并发控制实现

ExecuteLimitHandler

ExecuteLimitHandler的实现非常简单. 注意, 并发控制是方法级别的: Dubbo为每个方法对应了一个RpcStatus, 通过其中的Semaphore对象来控制线程数量, 没有tryAcquire成功会抛出异常:

1
2
3
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url
+ ", cause: The service using threads greater than <dubbo:service executes=\"" + max
+ "\" /> limited.");

值得注意的是, 每个方法会有一个对应的RPCStatus. 所以executes直接配置给某个类时, 指的是其中每个方法的并发限制, 而不是所有方法并发的总和.

这个基于Semaphore的实现非常简单, 为了实现更彻底的控制和隔离, 有人接入了Hystrix, 也是通过Dubbo Filter进行扩展的. 请参考下方链接.

ActiveLimitFilter

ActiveLimitFilter直接通过RPCStatus的计数功能来进行并发控制, 如果达到指定active数量, 会一直阻塞直到超时, 再抛出异常

1
2
3
4
5
throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);

这里的阻塞是直接通过RPCStatus继承自Objectwait,notify实现的.

另外值得注意的一点是虽然文档中表示:

限制 com.foo.BarService 的每个方法,每客户端并发执行(或占用连接的请求数)不能超过 10 个:

但实际上是, 客户端对每个Provider该方法的调用不能超过10个. 由于METHOD_STATISTICS中的Key是URL串, 比如 dubbo://10.1.1.1:20880/com.alibaba.dubbo.demo.DemoService. 所以对于不同Provider的相同方法, 实际上获取到的是两个不同的RPCStatus实例. 故可得到以上结论.

See Also